Skip to content

ops-catalog: materialize stats to Bigtable#2955

Open
williamhbaker wants to merge 4 commits into
masterfrom
wb/stats-next
Open

ops-catalog: materialize stats to Bigtable#2955
williamhbaker wants to merge 4 commits into
masterfrom
wb/stats-next

Conversation

@williamhbaker
Copy link
Copy Markdown
Member

@williamhbaker williamhbaker commented May 18, 2026

Description:

  • Create a minimalist Bigtable rust read client for reading catalog stats. As of these changes it's not wired up to anything, but it was developed in conjunction with prototyping various other usages of catalog stats in the code base: Data movement stalled alerts & abandoned task detection (control plane agent), OpenMetrics API, billing calculations, and speculative UI capabilities.
  • Run a Bigtable emulator locally for local stats, and materialize stats into it in parallel with the Postgres materialization. Some basic instructions are included for verifying the two databases are equivalent.
  • Add the new L2 stats derivation (now partitioned by grain) to the update_l2_reporting control plane API. This is needed for the local Bigtable stats materialization to work. Once the control plane API is deployed on this new code, it will also cause it to publish the new L2 derivation in production.

Per the above, prior to deploying the new control plane API on this code, we should create the new reporting-dedicated data plane. Then just after the control plane API is deployed, call the update_l2_reporting endpoint with the default data plane parameter set to that reporting data plane so that the new L2 derivation is created there.

Workflow steps:

(How does one use this feature, and how has it changed)

Documentation links affected:

(list any documentation links that you created, or existing ones that you've identified as needing updates, along with a brief description)

Notes for reviewers:

(anything that might help someone review this PR)

Move from its previous location in control-plane-api to the ops
crate so the upcoming Bigtable read client can share the definition.
Read side of the `catalog_stats_<grain>` tables that the new
stats-view materialization will write. Wraps the BigTable data API
behind name/range/prefix fetchers, each returning a coroutine-backed
stream.
The new catalog-stats client and its integration tests need a BigTable
endpoint; provide one via the cloud-sdk emulator wrapped in a systemd
unit and a mise task. The local control-plane agent depends on the
emulator so the dual-read path in `PGControlPlane` has somewhere to go,
and platform-test runs `local:bigtable` before nextest so the same
integration tests run in CI.
`update_l2_reporting` populates the new derivation in parallel with
the old one and stamps each L1 source with `not_before` = today so
the shadow doesn't backfill history during rollout.
@williamhbaker williamhbaker marked this pull request as ready for review May 18, 2026 18:05
self.read_rows(grain, row_set, vec![])
}

/// Streams every `catalog_stats_<grain>` row whose `catalog_name` stats
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Streams every `catalog_stats_<grain>` row whose `catalog_name` stats
/// Streams every `catalog_stats_<grain>` row whose `catalog_name` starts

retries = 0;
}
ReadResult::Retry(status) => {
if retries >= MAX_RETRIES {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using the gazette crate strategy of having the caller be responsible for retry policy, with a wrapping result type that indicates number of attempts: https://github.com/estuary/flow/blob/master/crates/gazette/src/journal/read/mod.rs#L55-L58
This also lets the caller decide what to do with logging of the actual error.

grain: Grain,
row_set: bt::RowSet,
additional_filters: Vec<bt::RowFilter>,
) -> impl futures_core::Stream<Item = anyhow::Result<CatalogStats>> + '_ {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crates like gazette, tokens, and the V2 runtime use tonic::Result over anyhow::Result consistently for all streamed results and IPC (the V1 runtime does as well now, IIRC, with a couple of exceptions like derive-sqlite).
I've found over time that tonic::Status is nicer to work with as a fundamental type for protocol errors -- for one, it's Clone, where anyhow::Error isn't.
The rule of thumb I've adhered to is anyhow::Result for errors within a current application call stack, converted to/from tonic::Status when communicating that error from/to other systems and inter-process services.

row_set: bt::RowSet,
additional_filters: Vec<bt::RowFilter>,
) -> Self {
// `CellsPerColumnLimitFilter(1)`: materialize-bigtable writes
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, fantastic that it has MVCC built in! So we don't even need to represent multiple versions ourselves...

);
};

let models::DeriveUsing::Typescript(models::DeriveUsingTypescript {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good, I'm glad this is using TypeScript (I was thinking about it the other day and couldn't recall which derivation runtime it used).

TypeScript should be easy(*) to scale using the V2 runtime. We'll be able to support SQLite derivations in V2 and run them as we do today, but not scale them out for the time being (until we meaningfully tackle block devices).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants